package com.lyon.demo.protocol.api.core.command;

import com.lyon.demo.protocol.api.core.RemotingCallback;
import com.lyon.demo.storage.common.SystemClock;
import com.lyon.demo.storage.common.util.TimeUtil;
import lombok.Data;
import lombok.SneakyThrows;

import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

/**
 * @author LeeYan9
 * @since 2022-05-24
 */
@SuppressWarnings("SingleStatementInBlock")
@Data
public class ResponseFuture<R> extends CompletableFuture<R> {

    public ResponseFuture(long requestId) {
        this.requestId = requestId;
    }

    public ResponseFuture(Long requestId, Long timeoutMillis) {
        this(requestId);
        this.timeoutMillis = timeoutMillis;
    }

    private long requestId;

    private RemotingCallback<ResponseFuture<R>> callback;

    private long timeoutMillis = -1;

    private long startMills = SystemClock.now();

    private CountDownLatch notifyLatch = new CountDownLatch(1);

    private volatile R data;
    private volatile Throwable cause;
    private volatile boolean sendOk;

    public RemotingCallback<ResponseFuture<R>> removeCallBack() {
        RemotingCallback<ResponseFuture<R>> backup = this.callback;
        this.callback = null;
        return backup;
    }

    @SuppressWarnings("SingleStatementInBlock")
    @SneakyThrows
    public R waitForMillis(Duration duration) {
        if (Objects.isNull(duration)) {
            notifyLatch.await();
        } else {
            notifyLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
        }
        return data;
    }


    @Override
    public boolean completeExceptionally(Throwable cause) {
        error(cause);
        return super.completeExceptionally(cause);
    }

    private void error(Throwable cause) {
        this.cause = cause;
    }

    private void success(R data) {
        this.data = data;
    }

    @Override
    @SneakyThrows
    public ResponseFuture<R> whenComplete(BiConsumer<? super R, ? super Throwable> consumer) {
        super.whenComplete(consumer);
//        waitForMillis(null);
        join(this);
        return this;
    }

    public static void join(ResponseFuture<?>... futures) {
        Arrays.stream(futures).forEach(CompletableFuture::join);
    }

    public boolean isTimeout() {
        return TimeUtil.elapsed(startMills + timeoutMillis) > 0;
    }

    @Override
    public boolean complete(R data) {
        this.success(data);
        return super.complete(data);
    }


}
